package org.rocketmq.config;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.rocketmq.annotation.RocketMQMessageListener;
import org.rocketmq.enums.ConsumeMode;
import org.rocketmq.enums.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.util.Assert;

import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

/**
 * @Author: zd
 * @Date: 2023-02-06 10:38
 * @Description: 消费者封装
 */
public class RocketMQConsumer implements ApplicationContextAware, Closeable, SmartInitializingSingleton {
    private static final Logger log = LoggerFactory.getLogger(RocketMQConsumer.class);

    private ConfigurableApplicationContext context;
    private String namesrvAddr;
    private List<DefaultMQPushConsumer> consumerList = new ArrayList<>();
    private AtomicLong counter = new AtomicLong(0);

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = (ConfigurableApplicationContext) applicationContext;
    }
    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    @Override
    public void close() throws IOException {
        for (DefaultMQPushConsumer consumer : consumerList) {
            consumer.shutdown();
            log.info("A defaultMQPushConsumer shutdown on namesrc {}", namesrvAddr);
        }
    }

    @Override
    public void afterSingletonsInstantiated() {
        // 获取所有RocketMQMessageListener注解标记的bean
        Map<String, Object> beans = this.context.getBeansWithAnnotation(RocketMQMessageListener.class)
                .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        // 注册
        beans.forEach(this::registerConsumer);
    }

    private void registerConsumer(String beanName, Object bean) {
        Class<?> clazz = bean.getClass();
        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);

        String containerBeanName = String.format("%s_%s", beanName, counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) context;

        genericApplicationContext.registerBean(containerBeanName, DefaultMQPushConsumer.class,
                () -> createRocketMQConsumer(containerBeanName, bean, annotation));

        DefaultMQPushConsumer consumer = genericApplicationContext.getBean(containerBeanName, DefaultMQPushConsumer.class);
        try {
            consumer.start();
            log.info("A consumer as {} init on namesrc {}", beanName, namesrvAddr);
        } catch (MQClientException e) {
            throw new RuntimeException("A consumer as " + beanName + " start fail, " + e.getMessage());
        }
    }

    private DefaultMQPushConsumer createRocketMQConsumer(String containerBeanName, Object bean, RocketMQMessageListener annotation) {
        String topic = annotation.topic();
        String consumerGroup = annotation.consumerGroup();

        Assert.hasText(topic, containerBeanName.substring(0,containerBeanName.indexOf("_")) + " topic 不允许为空");
        Assert.hasText(consumerGroup, containerBeanName.substring(0,containerBeanName.indexOf("_")) + " consumerGroup 不允许为空");

        String selectorExpression = annotation.selectorExpression();
        ConsumeMode consumeMode = annotation.consumeMode();
        MessageModel messageModel = annotation.messageModel();

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            consumer.subscribe(topic, selectorExpression);
            switch (messageModel) {
                case BROADCASTING:
                    consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
                    break;
                case CLUSTERING:
                    consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
                    break;
                default:
                    throw new IllegalArgumentException("Property 'messageModel' was wrong.");
            }

            RocketMQListener listener = (RocketMQListener) bean;
            if (consumeMode == ConsumeMode.ORDERLY) {
                // 顺序消费
                consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
                    for (MessageExt ext: msgs) {
                        log.info("Orderly message, {}", ext);
                        listener.onMessage(new String(ext.getBody(), StandardCharsets.UTF_8));
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                });
            } else {
                // 并发
                consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                    for (MessageExt ext: msgs) {
                        log.info("Concurrently message, {}", ext);
                        listener.onMessage(new String(ext.getBody(), StandardCharsets.UTF_8));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                });

            }
        } catch (MQClientException e) {
            throw new RuntimeException("A consumer as " + containerBeanName + " registerBean fail, " + e.getMessage());
        }
        consumerList.add(consumer);
        return consumer;
    }
}
